home *** CD-ROM | disk | FTP | other *** search
/ Chip 2007 January, February, March & April / Chip-Cover-CD-2007-02.iso / Pakiet bezpieczenstwa / mini Pentoo LiveCD 2006.1 / mpentoo-2006.1.iso / livecd.squashfs / usr / lib / python2.4 / test / test_queue.py < prev    next >
Text File  |  2005-10-18  |  9KB  |  238 lines

  1. # Some simple Queue module tests, plus some failure conditions
  2. # to ensure the Queue locks remain stable.
  3. import Queue
  4. import sys
  5. import threading
  6. import time
  7.  
  8. from test.test_support import verify, TestFailed, verbose
  9.  
  10. QUEUE_SIZE = 5
  11.  
  12. # A thread to run a function that unclogs a blocked Queue.
  13. class _TriggerThread(threading.Thread):
  14.     def __init__(self, fn, args):
  15.         self.fn = fn
  16.         self.args = args
  17.         self.startedEvent = threading.Event()
  18.         threading.Thread.__init__(self)
  19.  
  20.     def run(self):
  21.         # The sleep isn't necessary, but is intended to give the blocking
  22.         # function in the main thread a chance at actually blocking before
  23.         # we unclog it.  But if the sleep is longer than the timeout-based
  24.         # tests wait in their blocking functions, those tests will fail.
  25.         # So we give them much longer timeout values compared to the
  26.         # sleep here (I aimed at 10 seconds for blocking functions --
  27.         # they should never actually wait that long - they should make
  28.         # progress as soon as we call self.fn()).
  29.         time.sleep(0.1)
  30.         self.startedEvent.set()
  31.         self.fn(*self.args)
  32.  
  33. # Execute a function that blocks, and in a separate thread, a function that
  34. # triggers the release.  Returns the result of the blocking function.
  35. # Caution:  block_func must guarantee to block until trigger_func is
  36. # called, and trigger_func must guarantee to change queue state so that
  37. # block_func can make enough progress to return.  In particular, a
  38. # block_func that just raises an exception regardless of whether trigger_func
  39. # is called will lead to timing-dependent sporadic failures, and one of
  40. # those went rarely seen but undiagnosed for years.  Now block_func
  41. # must be unexceptional.  If block_func is supposed to raise an exception,
  42. # call _doExceptionalBlockingTest() instead.
  43. def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
  44.     t = _TriggerThread(trigger_func, trigger_args)
  45.     t.start()
  46.     result = block_func(*block_args)
  47.     # If block_func returned before our thread made the call, we failed!
  48.     if not t.startedEvent.isSet():
  49.         raise TestFailed("blocking function '%r' appeared not to block" %
  50.                          block_func)
  51.     t.join(10) # make sure the thread terminates
  52.     if t.isAlive():
  53.         raise TestFailed("trigger function '%r' appeared to not return" %
  54.                          trigger_func)
  55.     return result
  56.  
  57. # Call this instead if block_func is supposed to raise an exception.
  58. def _doExceptionalBlockingTest(block_func, block_args, trigger_func,
  59.                                trigger_args, expected_exception_class):
  60.     t = _TriggerThread(trigger_func, trigger_args)
  61.     t.start()
  62.     try:
  63.         try:
  64.             block_func(*block_args)
  65.         except expected_exception_class:
  66.             raise
  67.         else:
  68.             raise TestFailed("expected exception of kind %r" %
  69.                              expected_exception_class)
  70.     finally:
  71.         t.join(10) # make sure the thread terminates
  72.         if t.isAlive():
  73.             raise TestFailed("trigger function '%r' appeared to not return" %
  74.                              trigger_func)
  75.         if not t.startedEvent.isSet():
  76.             raise TestFailed("trigger thread ended but event never set")
  77.  
  78. # A Queue subclass that can provoke failure at a moment's notice :)
  79. class FailingQueueException(Exception):
  80.     pass
  81.  
  82. class FailingQueue(Queue.Queue):
  83.     def __init__(self, *args):
  84.         self.fail_next_put = False
  85.         self.fail_next_get = False
  86.         Queue.Queue.__init__(self, *args)
  87.     def _put(self, item):
  88.         if self.fail_next_put:
  89.             self.fail_next_put = False
  90.             raise FailingQueueException, "You Lose"
  91.         return Queue.Queue._put(self, item)
  92.     def _get(self):
  93.         if self.fail_next_get:
  94.             self.fail_next_get = False
  95.             raise FailingQueueException, "You Lose"
  96.         return Queue.Queue._get(self)
  97.  
  98. def FailingQueueTest(q):
  99.     if not q.empty():
  100.         raise RuntimeError, "Call this function with an empty queue"
  101.     for i in range(QUEUE_SIZE-1):
  102.         q.put(i)
  103.     # Test a failing non-blocking put.
  104.     q.fail_next_put = True
  105.     try:
  106.         q.put("oops", block=0)
  107.         raise TestFailed("The queue didn't fail when it should have")
  108.     except FailingQueueException:
  109.         pass
  110.     q.fail_next_put = True
  111.     try:
  112.         q.put("oops", timeout=0.1)
  113.         raise TestFailed("The queue didn't fail when it should have")
  114.     except FailingQueueException:
  115.         pass
  116.     q.put("last")
  117.     verify(q.full(), "Queue should be full")
  118.     # Test a failing blocking put
  119.     q.fail_next_put = True
  120.     try:
  121.         _doBlockingTest(q.put, ("full",), q.get, ())
  122.         raise TestFailed("The queue didn't fail when it should have")
  123.     except FailingQueueException:
  124.         pass
  125.     # Check the Queue isn't damaged.
  126.     # put failed, but get succeeded - re-add
  127.     q.put("last")
  128.     # Test a failing timeout put
  129.     q.fail_next_put = True
  130.     try:
  131.         _doExceptionalBlockingTest(q.put, ("full", True, 10), q.get, (),
  132.                                    FailingQueueException)
  133.         raise TestFailed("The queue didn't fail when it should have")
  134.     except FailingQueueException:
  135.         pass
  136.     # Check the Queue isn't damaged.
  137.     # put failed, but get succeeded - re-add
  138.     q.put("last")
  139.     verify(q.full(), "Queue should be full")
  140.     q.get()
  141.     verify(not q.full(), "Queue should not be full")
  142.     q.put("last")
  143.     verify(q.full(), "Queue should be full")
  144.     # Test a blocking put
  145.     _doBlockingTest( q.put, ("full",), q.get, ())
  146.     # Empty it
  147.     for i in range(QUEUE_SIZE):
  148.         q.get()
  149.     verify(q.empty(), "Queue should be empty")
  150.     q.put("first")
  151.     q.fail_next_get = True
  152.     try:
  153.         q.get()
  154.         raise TestFailed("The queue didn't fail when it should have")
  155.     except FailingQueueException:
  156.         pass
  157.     verify(not q.empty(), "Queue should not be empty")
  158.     q.fail_next_get = True
  159.     try:
  160.         q.get(timeout=0.1)
  161.         raise TestFailed("The queue didn't fail when it should have")
  162.     except FailingQueueException:
  163.         pass
  164.     verify(not q.empty(), "Queue should not be empty")
  165.     q.get()
  166.     verify(q.empty(), "Queue should be empty")
  167.     q.fail_next_get = True
  168.     try:
  169.         _doExceptionalBlockingTest(q.get, (), q.put, ('empty',),
  170.                                    FailingQueueException)
  171.         raise TestFailed("The queue didn't fail when it should have")
  172.     except FailingQueueException:
  173.         pass
  174.     # put succeeded, but get failed.
  175.     verify(not q.empty(), "Queue should not be empty")
  176.     q.get()
  177.     verify(q.empty(), "Queue should be empty")
  178.  
  179. def SimpleQueueTest(q):
  180.     if not q.empty():
  181.         raise RuntimeError, "Call this function with an empty queue"
  182.     # I guess we better check things actually queue correctly a little :)
  183.     q.put(111)
  184.     q.put(222)
  185.     verify(q.get() == 111 and q.get() == 222,
  186.            "Didn't seem to queue the correct data!")
  187.     for i in range(QUEUE_SIZE-1):
  188.         q.put(i)
  189.         verify(not q.empty(), "Queue should not be empty")
  190.     verify(not q.full(), "Queue should not be full")
  191.     q.put("last")
  192.     verify(q.full(), "Queue should be full")
  193.     try:
  194.         q.put("full", block=0)
  195.         raise TestFailed("Didn't appear to block with a full queue")
  196.     except Queue.Full:
  197.         pass
  198.     try:
  199.         q.put("full", timeout=0.01)
  200.         raise TestFailed("Didn't appear to time-out with a full queue")
  201.     except Queue.Full:
  202.         pass
  203.     # Test a blocking put
  204.     _doBlockingTest(q.put, ("full",), q.get, ())
  205.     _doBlockingTest(q.put, ("full", True, 10), q.get, ())
  206.     # Empty it
  207.     for i in range(QUEUE_SIZE):
  208.         q.get()
  209.     verify(q.empty(), "Queue should be empty")
  210.     try:
  211.         q.get(block=0)
  212.         raise TestFailed("Didn't appear to block with an empty queue")
  213.     except Queue.Empty:
  214.         pass
  215.     try:
  216.         q.get(timeout=0.01)
  217.         raise TestFailed("Didn't appear to time-out with an empty queue")
  218.     except Queue.Empty:
  219.         pass
  220.     # Test a blocking get
  221.     _doBlockingTest(q.get, (), q.put, ('empty',))
  222.     _doBlockingTest(q.get, (True, 10), q.put, ('empty',))
  223.  
  224. def test():
  225.     q = Queue.Queue(QUEUE_SIZE)
  226.     # Do it a couple of times on the same queue
  227.     SimpleQueueTest(q)
  228.     SimpleQueueTest(q)
  229.     if verbose:
  230.         print "Simple Queue tests seemed to work"
  231.     q = FailingQueue(QUEUE_SIZE)
  232.     FailingQueueTest(q)
  233.     FailingQueueTest(q)
  234.     if verbose:
  235.         print "Failing Queue tests seemed to work"
  236.  
  237. test()
  238.